今天要來寫 Seq2seq 的文章了,而這個 Seq2seq 的文章是一系列的文章,下一章節會在增加使用 self-aatention 的機制,然後還會使用 Transformer ,不過這邊不會講太多細節,如果想要知道細節的網路上的資源很多,這邊就來做程式碼的觀賞
!pip install torch==1.8.0+cu111 torchvision==0.9.0+cu111 torchaudio==0.8.0 torchtext==0.9.0 -f https://download.pytorch.org/whl/torch_stable.html
!pip install spacy
exit() # restart kernal
再來 import 的部分,因為後面版本整個 torchtext.legacy.data
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.legacy.data import Field, TabularDataset, BucketIterator
import numpy as np
import spacy
import random
from torch.utils.tensorboard import SummaryWriter # to print to tensorboard
# from utils import translate_sentence, bleu, save_checkpoint, load_checkpoint
from torchtext.data.metrics import bleu_score
import sys
import os
import torch, torchtext
print(torch.__version__) # 1.8.0 + cu111
print(torchtext.__version__) # 0.9.0
使用Google Driver
from google.colab import drive
!python -m spacy download zh_core_web_sm
!python -m spacy download en_core_web_sm
然後這邊就是先處理斷詞,同時把資料切成 train_data
和 test_data
spacy_zh = spacy.load("zh_core_web_sm")
spacy_eng = spacy.load("en_core_web_sm")
def tokenize_eng(text):
return [tok.text for tok in spacy_eng.tokenizer(text)]
def tokenize_zh(text):
return [tok.text for tok in spacy_zh.tokenizer(text)]
english = Field(sequential=True, use_vocab=True, tokenize=tokenize_eng, lower=True)
chinese = Field(sequential=True, use_vocab=True, tokenize=tokenize_zh, lower=True)
fields = {"English": ("eng", english), "Chinese": ("zh", chinese)}
train_data, test_data = TabularDataset.splits(
path="/content/drive/MyDrive/Colab Notebooks/ithome/torchtext_preprocess", train="t3_train.json", test="t3_test.json", format="json", fields=fields
然後建立自己的 vocabulary
english.build_vocab(train_data, max_size=50000, min_freq=20, vectors="glove.6B.100d")
chinese.build_vocab(train_data, max_size=50000, min_freq=50, vectors="glove.6B.100d")
把句子直接做翻譯的 Function ,我就不另外寫了
def translate_sentence(model, sentence, chinese, english, device, max_length=25):
# 先載入 tokensizer
spacy_zh = spacy.load("zh_core_web_sm")
# Create tokens using spacy and everything in lower case (which is what our vocab is)
if type(sentence) == str:
tokens = [token.text.lower() for token in spacy_zh(sentence)]
tokens = [token.lower() for token in sentence]
# 加入開始符號跟結束符號,代表一個句子
tokens.insert(0, chinese.init_token)
# 然後把文字轉自 vactor
text_to_indices = [chinese.vocab.stoi[token] for token in tokens]
# 再把 vactor list 轉換成 Tensor
sentence_tensor = torch.LongTensor(text_to_indices).unsqueeze(1).to(device)
# 取消梯度修正
with torch.no_grad():
hidden, cell = model.encoder(sentence_tensor)
# 先宣告 outputs ,然後裡面放一個開符號
outputs = [english.vocab.stoi["<sos>"]]
for _ in range(max_length):
previous_word = torch.LongTensor([outputs[-1]]).to(device)
# seq2seq 的decoder 會把上一個 hidden_state 跟 cell 當作input 這是觀念
# 然後output機率最大的
with torch.no_grad():
output, hidden, cell = model.decoder(previous_word, hidden, cell)
best_guess = output.argmax(1).item()
# 機率最大的數值再把它放進output
# 如果是結束字元 eos 的話就中斷不然會一直預測下去
if output.argmax(1).item() == english.vocab.stoi["<eos>"]:
# 再把 vactor 轉成文字, itos = integer to string
translated_sentence = [english.vocab.itos[idx] for idx in outputs]
# remove start token
return translated_sentence[1:]
然後再來是 bleu socre,這是常常被使用在 machine translation 上的計算方式,是用來計算翻譯品質的方法
def bleu(data, model, chinese, english, device):
targets = []
outputs = []
for example in data:
src = vars(example)["zh"]
trg = vars(example)["eng"]
prediction = translate_sentence(model, src, chinese, english, device)
prediction = prediction[:-1] # remove <eos> token
# bleu score 計算方法
return bleu_score(outputs, targets)
因為在訓練的過程中啊,像我這種比較客家的工程師就不會直接買Colab pro+ ,因為真的工作薪水就快養不活自己了,還要花錢享受 Colab 的服務,因此為了解決這個方法,我就會另外寫 checkpoint ,而我們前面就有提到怎麼寫,我就不多說了
還有其用途很簡單,因為我們這次訓練的是 machine translation ,所以會訓練很久,大約一小時才一個epoch,因此都要把訓練好的結果存起來,利用時間換取金錢XD
def save_checkpoint(state, filename="/content/drive/MyDrive/Colab Notebooks/ithome/seq2seq_cp.pth.tar"):
print("=> Saving checkpoint")
torch.save(state, filename)
def load_checkpoint(checkpoint, model, optimizer):
print("=> Loading checkpoint")
Encoder 的部分主要是要訓練 embedding 那一層
class Encoder(nn.Module):
def __init__(self, input_size, embedding_size, hidden_size, num_layers, p):
super(Encoder, self).__init__()
self.dropout = nn.Dropout(p)
self.hidden_size = hidden_size
self.num_layers = num_layers
self.embedding = nn.Embedding(input_size, embedding_size)
self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=p)
def forward(self, x):
# x shape: (seq_length, N) where N is batch size
embedding = self.dropout(self.embedding(x))
# embedding shape: (seq_length, N, embedding_size)
outputs, (hidden, cell) = self.rnn(embedding)
# outputs shape: (seq_length, N, hidden_size)
return hidden, cell
從encoder 的output 接起來後,每一次的input 是上個decoder 的output(hidden , cell),這邊要注意的是shape的大小,也是在訓練embedding
class Decoder(nn.Module):
def __init__(
self, input_size, embedding_size, hidden_size, output_size, num_layers, p
super(Decoder, self).__init__()
self.dropout = nn.Dropout(p)
self.hidden_size = hidden_size
self.num_layers = num_layers
self.embedding = nn.Embedding(input_size, embedding_size)
self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=p)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x, hidden, cell):
# x shape: (N) where N is for batch size, we want it to be (1, N), seq_length
# is 1 here because we are sending in a single word and not a sentence
x = x.unsqueeze(0)
embedding = self.dropout(self.embedding(x))
# embedding shape: (1, N, embedding_size)
outputs, (hidden, cell) = self.rnn(embedding, (hidden, cell))
# outputs shape: (1, N, hidden_size)
predictions = self.fc(outputs)
# predictions shape: (1, N, length_target_vocabulary) to send it to
# loss function we want it to be (N, length_target_vocabulary) so we're
# just gonna remove the first dim
predictions = predictions.squeeze(0)
return predictions, hidden, cell
注意一下 encoder 出來的只有 hidden ,cell ,然後decoder 第一個是 sos 開始解
另外 teacher_force_ratio
這比較特別我還不太懂他的意思,看意思有點像是用50%的機率用decoder 預測的數值,或者直接用正解的數值,因為怕一直錯下去,大概是這樣吧XD
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
super(Seq2Seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, source, target, teacher_force_ratio=0.5):
batch_size = source.shape[1]
# output 英文就是英文句子的長度(會加pad)
target_len = target.shape[0]
target_vocab_size = len(english.vocab)
outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device)
hidden, cell = self.encoder(source)
# Grab the first input to the Decoder which will be <SOS> token
x = target[0]
for t in range(1, target_len):
# Use previous hidden, cell as context from encoder at start
output, hidden, cell = self.decoder(x, hidden, cell)
# Store next output prediction
outputs[t] = output
# Get the best word the Decoder predicted (index in the vocabulary)
best_guess = output.argmax(1)
# With probability of teacher_force_ratio we take the actual next word
# otherwise we take the word that the Decoder predicted it to be.
# Teacher Forcing is used so that the model gets used to seeing
# similar inputs at training and testing time, if teacher forcing is 1
# then inputs at test time might be completely different than what the
# network is used to. This was a long comment.
x = target[t] if random.random() < teacher_force_ratio else best_guess
return outputs
checkPointPath = "/content/drive/MyDrive/Colab Notebooks/ithome/seq2seq_cp.pth.tar"
num_epochs = 100
learning_rate = 0.001
batch_size = 64
# Model hyperparameters
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_size_encoder = len(chinese.vocab)
input_size_decoder = len(english.vocab)
output_size = len(english.vocab)
encoder_embedding_size = 300
decoder_embedding_size = 300
hidden_size = 1024 # Needs to be the same for both RNN's
num_layers = 2
enc_dropout = 0.5
dec_dropout = 0.
# Tensorboard to get nice loss plot
writer = SummaryWriter(f"/content/drive/MyDrive/Colab Notebooks/ithome/runs/loss_plot")
step = 0
train_iterator, test_iterator = BucketIterator.splits(
(train_data, test_data),
sort_key = lambda x: len(x.zh),
這邊先把 encoder 跟 decoder 分開宣告
然後最後再一起丟進 seq2seq的 model
encoder_net = Encoder(input_size_encoder, encoder_embedding_size, hidden_size, num_layers, enc_dropout).to(device)
decoder_net = Decoder(
seq2seq 的model 就這樣被宣告,把剛剛宣告的 encoder 跟 decoder 放進來seq2seq裡面
model = Seq2Seq(encoder_net, decoder_net).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
pad_idx = english.vocab.stoi["<pad>"]
# CrossEntropy來計算誤差
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)
# checkpoint很重要
if os.path.isfile(checkPointPath):
load_checkpoint(torch.load(checkPointPath ), model, optimizer)
這訓練中每次的epoch 都會看一下翻譯的結果,看有沒有越翻越好的狀況,所以會先丟進去翻譯,然後再來訓練
sentence = "你想不想要来我家看猫?或者一起看Netflix?"
for epoch in range(num_epochs):
print(f"[Epoch {epoch} / {num_epochs}]")
checkpoint = {"state_dict": model.state_dict(), "optimizer": optimizer.state_dict()}
translated_sentence = translate_sentence(
model, sentence, chinese, english, device, max_length=50
print(f"Translated example sentence: \n {translated_sentence}")
for batch_idx, batch in enumerate(train_iterator):
# Get input and targets and get to cuda
inp_data = batch.zh.to(device)
target = batch.eng.to(device)
# Forward prop
output = model(inp_data, target)
# Output is of shape (trg_len, batch_size, output_dim) but Cross Entropy Loss
# doesn't take input in that form. For example if we have MNIST we want to have
# output to be: (N, 10) and targets just (N). Here we can view it in a similar
# way that we have output_words * batch_size that we want to send in into
# our cost function, so we need to do some reshapin. While we're at it
# Let's also remove the start token while we're at it
output = output[1:].reshape(-1, output.shape[2])
target = target[1:].reshape(-1)
loss = criterion(output, target)
# Back prop
# Clip to avoid exploding gradient issues, makes sure grads are
# within a healthy range
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
# Gradient descent step
# Plot to tensorboard
writer.add_scalar("Training loss", loss, global_step=step)
step += 1
score = bleu(test_data[1:100], model, chinese, english, device)
print(f"Bleu score {score*100:.2f}")